package com.hcj.springcloud.service;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;

/**
 * 多线程处理读写文件：
 * <br>
 * txt文件的读写服务：可批量处理很多小文件
 * <pre>
 *     可优化： 对文件子目录的文件做操作
 * </pre>
 */
@Data
@Slf4j
public class DoFileReadAndWriteService {

    /**
     * 文件读线程
     */
    @Data
    static class MyCallableProducer implements Callable<Map<String, FileInputStream>>{

        // 多线程计算器
        private CountDownLatch countDownLatch;
        private File file;
        private FileInputStream fileInputStream = null;
        private Map<String, FileInputStream> fileInputStreamMap = new HashMap<>();

        // 构造函数
        public MyCallableProducer(CountDownLatch countDownLatch, File file) {
            this.countDownLatch = countDownLatch;
            this.file = file;
        }

        @Override
        public Map<String, FileInputStream> call() throws Exception {
            log.info("[{}]\t线程读文件开始：\t[{}]\t时间：\t[{}]", Thread.currentThread().getName(), file.getName(), LocalDateTime.now());
            fileInputStream = new FileInputStream(file);
            fileInputStreamMap.put(file.getName(), fileInputStream);
            //doSleep();
            log.info("[{}]\t线程读文件结束：\t[{}]\t时间：\t[{}]", Thread.currentThread().getName(), file.getName(), LocalDateTime.now());
            countDownLatch.countDown();
            return fileInputStreamMap;
        }

        private void doSleep() {
            Random random = new Random();
            int time = random.nextInt(10)* 1000;
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 文件写线程
     */
    @Data
    static class MyCallableConsumer implements  Runnable{
        private String  fileName = "";
        private BlockingQueue<Future<Map<String, FileInputStream>>> blockingQueue;
        private FileInputStream fileInputStream = null;
        private File dirFile = null;


        private BufferedReader bufferedReader = null;
        private InputStreamReader inputStreamReader = null;
        private FileWriter fileWriter = null;
        private BufferedWriter bufferedWriter = null;

        // 构造函数
        public MyCallableConsumer(BlockingQueue<Future<Map<String, FileInputStream>>> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            try {
                Future<Map<String, FileInputStream>> future = blockingQueue.take();
                Map<String, FileInputStream> fileInputStreamMap = future.get();

                Set<String> keySet = fileInputStreamMap.keySet();
                Iterator<String> iterator = keySet.iterator();
                while (iterator.hasNext()){
                    fileName = iterator.next();
                    fileInputStream = fileInputStreamMap.get(fileName);
                    log.info("[{}]\t写文件开始：\t[{}] \t时间：[{}]", Thread.currentThread().getName(),fileName, LocalDateTime.now());
                    try  {
                        inputStreamReader = new InputStreamReader(fileInputStream, "utf-8");
                        bufferedReader = new BufferedReader(inputStreamReader);
                        dirFile = new File("e:" + File.separator + "dir_copy" + File.separator + fileName);
                        fileWriter = new FileWriter((dirFile));
                        bufferedWriter = new BufferedWriter(fileWriter);
                        String data = null;
                        //bufferedWriter.write("["+ Thread.currentThread().getName()+"]"+"开始写文件:");
                        while ((data = bufferedReader.readLine()) != null){
                            bufferedWriter.write(data);
                            bufferedWriter.write("\r");
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            bufferedWriter.close();
                            bufferedReader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    log.info("[{}]\t写文件结束：\t[{}] \t时间：[{}]", Thread.currentThread().getName(),fileName, LocalDateTime.now());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        File dir = new File("e:" + File.separator + "dir");
        // 文件总数
        final List<File> filePathList = new ArrayList<>();
        File[] files = dir.listFiles();
        for (File file: files ) {
            filePathList.add(file);
        }

        CountDownLatch countDownLatch = new CountDownLatch(filePathList.size());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ArrayBlockingQueue<Future<Map<String, FileInputStream>>> blockingQueue = new ArrayBlockingQueue<Future<Map<String, FileInputStream>>>(100);

        log.info("[{}]\t:\t{}", "文件读写任务开始", LocalDateTime.now());
        for (int i = 0; i < filePathList.size(); i++) {
            File temp = filePathList.get(i);
            Future<Map<String, FileInputStream>> mapFuture = executorService.submit(new MyCallableProducer(countDownLatch, temp));
            blockingQueue.add(mapFuture);
            executorService.execute(new MyCallableConsumer(blockingQueue));
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("[{}]\t:\t{}", "文件读写任务结束", LocalDateTime.now());
        executorService.shutdown();
    }

}
